pub struct PipelineBuilder { /* private fields */ }
Expand description
Used to construct a Pipeline.
Can be created using Pipeline::builder.
Implementations§
Source§impl PipelineBuilder
impl PipelineBuilder
Sourcepub fn with_inputs<S, I>(self, pipe: S, inputs: Vec<I>) -> Self
pub fn with_inputs<S, I>(self, pipe: S, inputs: Vec<I>) -> Self
A “producer” stage; registers a list of inputs to be written to a provided pipe.
The string provided to pipe
defines where the values will be written to.
The values will be written one at a time into the pipe.
§Returns
This pipeline builder.
Sourcepub fn with_branching_inputs<S>(
self,
pipes: Vec<S>,
inputs: Vec<Vec<BoxedAnySend>>,
) -> Self
pub fn with_branching_inputs<S>( self, pipes: Vec<S>, inputs: Vec<Vec<BoxedAnySend>>, ) -> Self
A “producer” stage; registers a list of multiple inputs to be written to a list of corresponding pipes.
The strings provided to pipes
define where each input will go.
The values will be written one at a time into each pipe.
For example, say you have the following:
List of multiple inputs: [ [1, "hi", true], [2, "bye", false], [3, ".", false] ]
List of pipes: [ "numbers", "strings", "bools" ]
The inputs would be sent to the pipes like this:
Pipe 1st 2nd 3rd
"numbers" <- 1 2 3
"strings" <- "hi" "bye" "."
"bools" <- true false false
§Returns
This pipeline builder.
Sourcepub fn with_producer<S, I, F, Fut>(self, pipe: S, task: F) -> Self
pub fn with_producer<S, I, F, Fut>(self, pipe: S, task: F) -> Self
A “producer” stage; registers a stage that produces values and writes them into a pipe.
The strings provided to pipes
define where each input will go.
The producer will continue producing values while the user-provided task function returns Some. This means that it is possible to create an infinite stream of values by simply never returning None.
§Returns
This pipeline builder.
Sourcepub fn with_branching_producer<S, F, Fut>(self, pipes: Vec<S>, task: F) -> Self
pub fn with_branching_producer<S, F, Fut>(self, pipes: Vec<S>, task: F) -> Self
A “producer” stage; registers a new stage that produces multiple values and writes them into their respective pipe.
The strings provided to pipes
define where each input will go.
The producer will continue producing values while the user-provided task function returns Some. This means that it is possible to create an infinite stream of values by simply never returning None.
Each individual Option within the task output determines whether it will be sent to the corresponding pipe. If Some is specified, the inner value will be sent, if None is specified, nothing will be sent.
As with all stages that have more than one (“branching”) outputs, it’s possible that each output could have a different type, and so to avoid large binary sizes from static dispatching, dynamic dispatching is used instead, utilizing the BoxedAnySend type. For examples on how to return these types of values in task functions, see BoxedAnySend’s examples.
§Returns
This pipeline builder.
Sourcepub fn with_consumer<S, I, F, Fut>(
self,
pipe: S,
options: WorkerOptions,
task: F,
) -> Self
pub fn with_consumer<S, I, F, Fut>( self, pipe: S, options: WorkerOptions, task: F, ) -> Self
A “consumer” stage; registers a new stage that consumes values from a pipe.
The string provided to pipe
define where values will be read from.
The consumer will continue consuming values until the pipeline is terminated or the pipe it is receiving from is closed.
§Returns
This pipeline builder.
Sourcepub fn with_stage<I, O, F, Fut>(
self,
input_pipe: impl AsRef<str>,
output_pipe: impl AsRef<str>,
options: WorkerOptions,
task: F,
) -> Self
pub fn with_stage<I, O, F, Fut>( self, input_pipe: impl AsRef<str>, output_pipe: impl AsRef<str>, options: WorkerOptions, task: F, ) -> Self
A “regular” stage; registers a new stage that operates on an input and produce a single output value that is written to a pipe.
The string provided to input_pipe
defines where values will be read from.
The string provided to output_pipe
defines where the produced output will go.
The worker will continue working on input values until the pipeline is terminated or the pipe it is receiving from is closed.
The Option returned by the task function determines whether it will be sent to the output pipe. If Some is specified, the inner value will be sent, if None is specified, nothing will be sent.
§Returns
This pipeline builder.
Sourcepub fn with_branching_stage<I, F, Fut>(
self,
input_pipe: impl AsRef<str>,
output_pipes: Vec<impl AsRef<str>>,
options: WorkerOptions,
task: F,
) -> Self
pub fn with_branching_stage<I, F, Fut>( self, input_pipe: impl AsRef<str>, output_pipes: Vec<impl AsRef<str>>, options: WorkerOptions, task: F, ) -> Self
A “regular” stage; registers a new stage that operates on an input and produces multiple values that are written into their respective pipe.
The string provided to input_pipe
defines where values will be read from.
The strings provided to output_pipes
define where each produced output will go.
The worker will continue working on input values until the pipeline is terminated or the pipe it is receiving from is closed.
- If the user-defined task function returns None, nothing will be done.
- If it returns Some, the inner value (
Vec<Option<BoxedAnySend>>
) will have the following applied to each output option:
As with all stages that have more than one (“branching”) outputs, it’s possible that each output could have a different type, and so to avoid large binary sizes from static dispatching, dynamic dispatching is used instead, utilizing the BoxedAnySend type. For examples on how to return these types of values in task functions, see BoxedAnySend’s examples.
§Returns
This pipeline builder.
Sourcepub fn with_flattener<It>(
self,
from_pipe: impl AsRef<str>,
to_pipe: impl AsRef<str>,
) -> Self
pub fn with_flattener<It>( self, from_pipe: impl AsRef<str>, to_pipe: impl AsRef<str>, ) -> Self
An “iterator-based” stage; registers a new stage that takes the data from one pipe and “flattens” it, feeding the results into another.
This is useful if you have a pipe that produces a list of values in a single task execution, but you want to use it as input to another stage that takes only the individual values.
The generic parameter is used by the pipeline builder to know what concrete type to
cast the value to, which mean turbofish syntax will be needed to specify what the iterator
type of that pipe is, for example:
Pipeline::builder().with_flattener::<Vec<u8>>("data", "bytes")
The string provided to from_pipe
defines where the iterator of values will be read from.
The string provided to to_pipe
defines where the individual values will go.
The worker will continue working until the pipeline is terminated or the pipe it is receiving from is closed.
§Examples
use std::sync::Arc;
use std::sync::atomic::{AtomicI32, Ordering};
use async_pipes::Pipeline;
use async_pipes::WorkerOptions;
#[tokio::main]
async fn main() {
let sum = Arc::new(AtomicI32::new(0));
let task_sum = sum.clone();
Pipeline::builder()
.with_inputs("NumberSets", vec![vec![1, 2], vec![3, 4, 5]])
.with_flattener::<Vec<i32>>("NumberSets", "Numbers")
.with_consumer("Numbers", WorkerOptions::default_single_task(), move |value: i32| {
let sum = task_sum.clone();
async move {
sum.fetch_add(value, Ordering::SeqCst);
}
})
.build()
.expect("failed to build pipeline")
.wait()
.await;
assert_eq!(sum.load(Ordering::Acquire), 15);
}
§Returns
This pipeline builder.
Sourcepub fn also(
self,
handler: impl FnOnce(PipelineBuilder) -> PipelineBuilder,
) -> Self
pub fn also( self, handler: impl FnOnce(PipelineBuilder) -> PipelineBuilder, ) -> Self
A utility function for improving readability when building pipelines.
This makes splitting up task definitions into functions easier by allowing the caller
to pass in a function that takes self
and returns self
, effectively providing
continuity to a call chain.
§Examples
use std::sync::Arc;
use std::sync::atomic::{AtomicI32, Ordering};
use async_pipes::Pipeline;
use async_pipes::PipelineBuilder;
#[tokio::main]
async fn main() {
let sum = Arc::new(AtomicI32::new(0));
Pipeline::builder()
.with_inputs("Numbers", vec![1, 2, 3, 4, 5])
.also(build_consumer(sum.clone()))
.build()
.expect("failed to build pipeline")
.wait()
.await;
assert_eq!(sum.load(Ordering::Acquire), 15);
}
fn build_consumer(sum: Arc<AtomicI32>) -> impl FnOnce(PipelineBuilder) -> PipelineBuilder {
use async_pipes::WorkerOptions;
|builder| builder.with_consumer("Numbers", WorkerOptions::default(), move |value: i32| {
let sum = sum.clone();
async move {
sum.fetch_add(value, Ordering::SeqCst);
}
})
}
Sourcepub fn build(self) -> Result<Pipeline, String>
pub fn build(self) -> Result<Pipeline, String>
When the pipeline is ready to be built, this is called and will return a pipeline if it was successfully built, otherwise it will return an error describing why it could not be built.
§Errors
- A pipe is “open-ended”, meaning there’s no stage consuming values from that pipe.
- The reader of a pipe was used more than once.
Trait Implementations§
Source§impl Default for PipelineBuilder
impl Default for PipelineBuilder
Source§fn default() -> PipelineBuilder
fn default() -> PipelineBuilder
Auto Trait Implementations§
impl Freeze for PipelineBuilder
impl !RefUnwindSafe for PipelineBuilder
impl Send for PipelineBuilder
impl !Sync for PipelineBuilder
impl Unpin for PipelineBuilder
impl !UnwindSafe for PipelineBuilder
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self
into a Left
variant of Either<Self, Self>
if into_left
is true
.
Converts self
into a Right
variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self
into a Left
variant of Either<Self, Self>
if into_left(&self)
returns true
.
Converts self
into a Right
variant of Either<Self, Self>
otherwise. Read more