Struct async_pipes::Pipeline
source · pub struct Pipeline { /* private fields */ }
Expand description
A pipeline defines the infrastructure for managing stage workers and transferring data between them using pipes defined by the workers.
Examples
Creating a single producer and a single consumer.
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::atomic::Ordering::Acquire;
use tokio::sync::Mutex;
use async_pipes::{Pipeline, PipeReader, Pipes, PipeWriter};
tokio_test::block_on(async {
let (mut pipeline, mut pipes): (Pipeline, Pipes) = Pipeline::from_pipes(vec!["Pipe"]);
let (writer, reader) = pipes.create_io::<usize>("Pipe").unwrap();
// Produce values 1 through 10
let count = Arc::new(Mutex::new(0));
pipeline.register_producer("Producer", writer, || async move {
let mut count = count.lock().await;
if *count < 10 {
*count += 1;
Some(*count)
} else {
None
}
});
let sum = Arc::new(AtomicUsize::new(0));
let task_sum = sum.clone();
pipeline.register_consumer("Consumer", reader, |n: usize| async move {
task_sum.fetch_add(n, Ordering::SeqCst);
});
pipeline.wait().await;
assert_eq!(sum.load(Acquire), 55);
});
Creating a branching producer and two consumers for each branch.
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::atomic::Ordering::Acquire;
use tokio::sync::Mutex;
use async_pipes::{Pipeline , Pipes };
tokio_test::block_on(async {
let (mut pipeline, mut pipes): (Pipeline, Pipes) = Pipeline::from_pipes(vec!["evens", "odds"]);
let (evens_w, evens_r) = pipes.create_io::<usize>("evens").unwrap();
let (odds_w, odds_r) = pipes.create_io::<usize>("odds").unwrap();
let count = Arc::new(Mutex::new(0));
pipeline.register_branching_producer("producer", vec![evens_w, odds_w], || async move {
let mut count = count.lock().await;
if *count >= 10 {
return None;
}
*count += 1;
let output = if *count % 2 == 0 {
vec![Some(*count), None]
} else {
vec![None, Some(*count)]
};
Some(output)
});
let odds_sum = Arc::new(AtomicUsize::new(0));
let task_odds_sum = odds_sum.clone();
pipeline.register_consumer("odds-consumer", odds_r, |n: usize| async move {
task_odds_sum.fetch_add(n, Ordering::SeqCst);
});
let evens_sum = Arc::new(AtomicUsize::new(0));
let task_evens_sum = evens_sum.clone();
pipeline.register_consumer("evens-consumer", evens_r, |n: usize| async move {
task_evens_sum.fetch_add(n, Ordering::SeqCst);
});
pipeline.wait().await;
assert_eq!(odds_sum.load(Acquire), 25);
assert_eq!(evens_sum.load(Acquire), 30);
});
Implementations§
source§impl Pipeline
impl Pipeline
sourcepub fn from_pipes<S: AsRef<str>>(names: Vec<S>) -> (Self, Pipes)
pub fn from_pipes<S: AsRef<str>>(names: Vec<S>) -> (Self, Pipes)
Create a new pipeline using the list of pipe names.
Using the names, an instance of Pipes will also be created which can be used to create the I/O objects for each pipe. These objects can be given to Pipeline::register_branching calls which tell the pipeline to connect that pipe to the stage worker being registered.
Providing no names will result in a pipeline that can’t transfer any data.
sourcepub async fn wait(self)
pub async fn wait(self)
Wait for the pipeline to complete.
Once the pipeline is complete, a termination signal is sent to to all the workers.
A pipeline progresses to completion by doing the following:
- Wait for all “producers” to complete while also progressing stage workers
- Wait for either all the stage workers to complete, or wait for the internal synchronizer to notify of completion (i.e. there’s no more data flowing through the pipeline)
Step 1 implies that if the producers never finish, the pipeline will run forever. See Pipeline::register_producer for more info.
sourcepub fn register_inputs<O: Send + 'static>(
&mut self,
name: impl Into<String>,
writer: PipeWriter<O>,
inputs: Vec<O>
)
pub fn register_inputs<O: Send + 'static>( &mut self, name: impl Into<String>, writer: PipeWriter<O>, inputs: Vec<O> )
Register a set of inputs to be written to a provided pipe.
This effectively creates a producer stage internally, looping over each input and writing it to the pipe.
Arguments
name
- For debugging purposes; provide an identifying string for this stage.writer
- Created by Pipes::create_io, where each input is written to.inputs
- A list of inputs to write to the pipe.
sourcepub fn register_branching_inputs<O: Send + 'static>(
&mut self,
name: impl Into<String>,
writers: Vec<PipeWriter<O>>,
inputs: Vec<Vec<O>>
)
pub fn register_branching_inputs<O: Send + 'static>( &mut self, name: impl Into<String>, writers: Vec<PipeWriter<O>>, inputs: Vec<Vec<O>> )
Register a set of inputs to be written to a provided pipe.
This effectively creates a producer stage internally, looping over each input and writing it to the pipe.
Arguments
name
- For debugging purposes; provide an identifying string for this stage.writer
- Created by Pipes::create_io, where each input is written to.inputs
- A list of inputs to write to the pipe.
sourcepub fn register_producer<O, F, Fut>(
&mut self,
name: impl Into<String>,
writer: PipeWriter<O>,
task_definition: F
)
pub fn register_producer<O, F, Fut>( &mut self, name: impl Into<String>, writer: PipeWriter<O>, task_definition: F )
Register a new stage in the pipeline that produces values and writes them into a pipe.
The producer will continue producing values until the user-provided task function returns None
.
This means that it is possible to create an infinite stream of values by simply never returning None
.
Arguments
name
- For debugging purposes; provide an identifying string for this stage.writer
- Created by Pipes::create_io, where produced output is written to.task_definition
- An async function that when executed produces a single output value.
If the output value returned by the task definition is Some(...)
, it is written to
the provided pipe. Otherwise, if the output value is None
, the producer terminates.
sourcepub fn register_branching_producer<O, F, Fut>(
&mut self,
_name: impl Into<String>,
writers: Vec<PipeWriter<O>>,
task_definition: F
)
pub fn register_branching_producer<O, F, Fut>( &mut self, _name: impl Into<String>, writers: Vec<PipeWriter<O>>, task_definition: F )
Register a new stage in the pipeline that produces multiple values and writes them into their respective pipe.
The producer will continue producing values until the user-provided task function returns None
.
This means that it is possible to create an infinite stream of values by simply never returning None
.
Arguments
name
- for debugging purposes; provide an identifying string for this stage.writers
- created by Pipes::create_io, where produced outputs are written to. The position of each output in the returned vector maps directly to the position of the writer in thewriters
vector provided.task_definition
- an async function that when executed produces a list of output values.
If the output returned by the task definition is Some(vec![...])
each value in the vector is
written to the respective pipe. Otherwise, if the output value is None
, the producer terminates.
sourcepub fn register<I, O, F, Fut>(
&mut self,
name: impl Into<String>,
reader: PipeReader<I>,
writer: PipeWriter<O>,
task_definition: F
)
pub fn register<I, O, F, Fut>( &mut self, name: impl Into<String>, reader: PipeReader<I>, writer: PipeWriter<O>, task_definition: F )
Register a new stage in the pipeline that operates on incoming data and writes the result to a single output pipe.
This effectively provides a means to do a “mapping” transformation on incoming data, with an additional
capability to filter it by returning None
in the task definition.
Arguments
name
- for debugging purposes; provide an identifying string for this stage.reader
- Created by Pipes::create_io, where incoming data is read from.writer
- Created by Pipes::create_io, where the task’s output is written to.task_definition
- An async function that operates on input received from thereader
and returns an output that is written to thewriter
.
If the output returned by the task definition is Some(...)
, that value will be written.
Otherwise, if the output value is None
, that value will not be written.
sourcepub fn register_branching<I, O, F, Fut>(
&mut self,
name: impl Into<String>,
reader: PipeReader<I>,
writers: Vec<PipeWriter<O>>,
task_definition: F
)
pub fn register_branching<I, O, F, Fut>( &mut self, name: impl Into<String>, reader: PipeReader<I>, writers: Vec<PipeWriter<O>>, task_definition: F )
Register a new stage in the pipeline that operates on reads incoming data and writes the results to its respective output pipe.
This effectively provides a means to do a “mapping” transformation on incoming data, with an additional
capability to filter it by returning None
in the task definition.
Arguments
name
- for debugging purposes; provide an identifying string for this stage.reader
- Created by Pipes::create_io, where incoming data is read from.writer
- Created by Pipes::create_io, where the task’s output is written to. The position of each output in the returned vector maps directly to the position of the writer in thewriters
vector provided.task_definition
- An async function that operates on input received from thereader
and returns a list of outputs where each is written to its respectivewriter
.
For each output, if the task definition returns Some(...)
, that value will be written.
Otherwise, if it is None
, that value will not be written.
sourcepub fn register_consumer<I, F, Fut>(
&mut self,
name: impl Into<String>,
reader: PipeReader<I>,
task_definition: F
)
pub fn register_consumer<I, F, Fut>( &mut self, name: impl Into<String>, reader: PipeReader<I>, task_definition: F )
Register a new stage in the pipeline that consumes incoming data from a pipe.
This acts as a “leaf” stage where the data flowing through the pipeline stops flowing at.
Arguments
name
- for debugging purposes; provide an identifying string for this stage.reader
- Created by Pipes::create_io, where incoming data is read from.writer
- Created by Pipes::create_io, where the task’s output is written to.task_definition
- An async function that operates on input received from thereader
and returns an output that is written to thewriter
.
If the output returned by the task definition is Some(...)
, that value will be written.
Otherwise, if the output value is None
, that value will not be written.