Struct PipelineBuilder

Source
pub struct PipelineBuilder { /* private fields */ }
Expand description

Used to construct a Pipeline.

Can be created using Pipeline::builder.

Implementations§

Source§

impl PipelineBuilder

Source

pub fn with_inputs<S, I>(self, pipe: S, inputs: Vec<I>) -> Self
where S: AsRef<str>, I: Send + 'static,

A “producer” stage; registers a list of inputs to be written to a provided pipe.

The string provided to pipe defines where the values will be written to. The values will be written one at a time into the pipe.

§Returns

This pipeline builder.

Source

pub fn with_branching_inputs<S>( self, pipes: Vec<S>, inputs: Vec<Vec<BoxedAnySend>>, ) -> Self
where S: AsRef<str>,

A “producer” stage; registers a list of multiple inputs to be written to a list of corresponding pipes.

The strings provided to pipes define where each input will go. The values will be written one at a time into each pipe.

For example, say you have the following:

List of multiple inputs: [ [1, "hi", true], [2, "bye", false], [3, ".", false] ]
List of pipes: [ "numbers", "strings", "bools" ]

The inputs would be sent to the pipes like this:

Pipe         1st   2nd    3rd
"numbers" <- 1     2      3
"strings" <- "hi"  "bye"  "."
"bools"   <- true  false  false
§Returns

This pipeline builder.

Source

pub fn with_producer<S, I, F, Fut>(self, pipe: S, task: F) -> Self
where S: AsRef<str>, I: Send + 'static, F: FnMut() -> Fut + Send + 'static, Fut: Future<Output = Option<I>> + Send + 'static,

A “producer” stage; registers a stage that produces values and writes them into a pipe.

The strings provided to pipes define where each input will go.

The producer will continue producing values while the user-provided task function returns Some. This means that it is possible to create an infinite stream of values by simply never returning None.

§Returns

This pipeline builder.

Source

pub fn with_branching_producer<S, F, Fut>(self, pipes: Vec<S>, task: F) -> Self
where S: AsRef<str>, F: FnMut() -> Fut + Send + 'static, Fut: Future<Output = Option<Vec<Option<BoxedAnySend>>>> + Send + 'static,

A “producer” stage; registers a new stage that produces multiple values and writes them into their respective pipe.

The strings provided to pipes define where each input will go.

The producer will continue producing values while the user-provided task function returns Some. This means that it is possible to create an infinite stream of values by simply never returning None.

Each individual Option within the task output determines whether it will be sent to the corresponding pipe. If Some is specified, the inner value will be sent, if None is specified, nothing will be sent.

As with all stages that have more than one (“branching”) outputs, it’s possible that each output could have a different type, and so to avoid large binary sizes from static dispatching, dynamic dispatching is used instead, utilizing the BoxedAnySend type. For examples on how to return these types of values in task functions, see BoxedAnySend’s examples.

§Returns

This pipeline builder.

Source

pub fn with_consumer<S, I, F, Fut>( self, pipe: S, options: WorkerOptions, task: F, ) -> Self
where S: AsRef<str>, I: Send + 'static, F: Fn(I) -> Fut + Send + Sync + 'static, Fut: Future<Output = ()> + Send + 'static,

A “consumer” stage; registers a new stage that consumes values from a pipe.

The string provided to pipe define where values will be read from.

The consumer will continue consuming values until the pipeline is terminated or the pipe it is receiving from is closed.

§Returns

This pipeline builder.

Source

pub fn with_stage<I, O, F, Fut>( self, input_pipe: impl AsRef<str>, output_pipe: impl AsRef<str>, options: WorkerOptions, task: F, ) -> Self
where I: Send + 'static, O: Send + 'static, F: Fn(I) -> Fut + Send + Sync + 'static, Fut: Future<Output = Option<O>> + Send + 'static,

A “regular” stage; registers a new stage that operates on an input and produce a single output value that is written to a pipe.

The string provided to input_pipe defines where values will be read from. The string provided to output_pipe defines where the produced output will go.

The worker will continue working on input values until the pipeline is terminated or the pipe it is receiving from is closed.

The Option returned by the task function determines whether it will be sent to the output pipe. If Some is specified, the inner value will be sent, if None is specified, nothing will be sent.

§Returns

This pipeline builder.

Source

pub fn with_branching_stage<I, F, Fut>( self, input_pipe: impl AsRef<str>, output_pipes: Vec<impl AsRef<str>>, options: WorkerOptions, task: F, ) -> Self
where I: Send + 'static, F: Fn(I) -> Fut + Send + Sync + 'static, Fut: Future<Output = Option<Vec<Option<BoxedAnySend>>>> + Send + 'static,

A “regular” stage; registers a new stage that operates on an input and produces multiple values that are written into their respective pipe.

The string provided to input_pipe defines where values will be read from. The strings provided to output_pipes define where each produced output will go.

The worker will continue working on input values until the pipeline is terminated or the pipe it is receiving from is closed.

  • If the user-defined task function returns None, nothing will be done.
  • If it returns Some, the inner value (Vec<Option<BoxedAnySend>>) will have the following applied to each output option:
    • If Some is specified, the inner value will be sent to the corresponding pipe.
    • If None is specified, nothing will be sent.

As with all stages that have more than one (“branching”) outputs, it’s possible that each output could have a different type, and so to avoid large binary sizes from static dispatching, dynamic dispatching is used instead, utilizing the BoxedAnySend type. For examples on how to return these types of values in task functions, see BoxedAnySend’s examples.

§Returns

This pipeline builder.

Source

pub fn with_flattener<It>( self, from_pipe: impl AsRef<str>, to_pipe: impl AsRef<str>, ) -> Self
where It: IntoIterator + Send + 'static, It::Item: Send,

An “iterator-based” stage; registers a new stage that takes the data from one pipe and “flattens” it, feeding the results into another.

This is useful if you have a pipe that produces a list of values in a single task execution, but you want to use it as input to another stage that takes only the individual values.

The generic parameter is used by the pipeline builder to know what concrete type to cast the value to, which mean turbofish syntax will be needed to specify what the iterator type of that pipe is, for example: Pipeline::builder().with_flattener::<Vec<u8>>("data", "bytes")

The string provided to from_pipe defines where the iterator of values will be read from. The string provided to to_pipe defines where the individual values will go.

The worker will continue working until the pipeline is terminated or the pipe it is receiving from is closed.

§Examples
use std::sync::Arc;
use std::sync::atomic::{AtomicI32, Ordering};
use async_pipes::Pipeline;
use async_pipes::WorkerOptions;

#[tokio::main]
async fn main() {
    let sum = Arc::new(AtomicI32::new(0));
    let task_sum = sum.clone();

    Pipeline::builder()
        .with_inputs("NumberSets", vec![vec![1, 2], vec![3, 4, 5]])
        .with_flattener::<Vec<i32>>("NumberSets", "Numbers")
        .with_consumer("Numbers", WorkerOptions::default_single_task(), move |value: i32| {
            let sum = task_sum.clone();
            async move {
                sum.fetch_add(value, Ordering::SeqCst);
            }
        })
        .build()
        .expect("failed to build pipeline")
        .wait()
        .await;

    assert_eq!(sum.load(Ordering::Acquire), 15);
}
§Returns

This pipeline builder.

Source

pub fn also( self, handler: impl FnOnce(PipelineBuilder) -> PipelineBuilder, ) -> Self

A utility function for improving readability when building pipelines.

This makes splitting up task definitions into functions easier by allowing the caller to pass in a function that takes self and returns self, effectively providing continuity to a call chain.

§Examples
use std::sync::Arc;
use std::sync::atomic::{AtomicI32, Ordering};
use async_pipes::Pipeline;
use async_pipes::PipelineBuilder;

#[tokio::main]
async fn main() {
    let sum = Arc::new(AtomicI32::new(0));

    Pipeline::builder()
        .with_inputs("Numbers", vec![1, 2, 3, 4, 5])
        .also(build_consumer(sum.clone()))
        .build()
        .expect("failed to build pipeline")
        .wait()
        .await;

    assert_eq!(sum.load(Ordering::Acquire), 15);
}

fn build_consumer(sum: Arc<AtomicI32>) -> impl FnOnce(PipelineBuilder) -> PipelineBuilder {
    use async_pipes::WorkerOptions;
|builder| builder.with_consumer("Numbers", WorkerOptions::default(), move |value: i32| {
        let sum = sum.clone();
        async move {
            sum.fetch_add(value, Ordering::SeqCst);
        }
    })
}
Source

pub fn build(self) -> Result<Pipeline, String>

When the pipeline is ready to be built, this is called and will return a pipeline if it was successfully built, otherwise it will return an error describing why it could not be built.

§Errors
  1. A pipe is “open-ended”, meaning there’s no stage consuming values from that pipe.
  2. The reader of a pipe was used more than once.

Trait Implementations§

Source§

impl Default for PipelineBuilder

Source§

fn default() -> PipelineBuilder

Returns the “default value” for a type. Read more

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> IntoEither for T

Source§

fn into_either(self, into_left: bool) -> Either<Self, Self>

Converts self into a Left variant of Either<Self, Self> if into_left is true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
where F: FnOnce(&Self) -> bool,

Converts self into a Left variant of Either<Self, Self> if into_left(&self) returns true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.